package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.types.Row;

/**
 * @Description 将DataSet数据流插入到mysql表中
 * @Author JL
 * @Date 2020/09/18
 * @Version V1.0
 */
public class DataSetSink {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //插入sql
        String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)";
        //封装数据
        TUser user = new TUser();
        //数据库自增，设为0，在入库时会自增
        user.setId(0);
        user.setName("zhao3");
        user.setAge(23);
        user.setSex(0);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        //将实体转换为Row对象，new Row(字段个数)
        Row row = Row.of(user.getId(), user.getName(), user.getAge(), user.getSex(), user.getAddress(), user.getCreateTimeSeries());
        //转换为dataSet
        DataSet<Row> dataSet = env.fromElements(row);
        //配置jdbc，输出到mysql
        dataSet.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setQuery(sql)
                .finish());
        //执行
        env.execute("flink data to mysql");
    }

}
